In [41]:
import org.apache.spark.sql.DataFrame
/** Calculates the homogeneity custer scoring
*
* @param df a Spark DataFrame of the input data
* @param truth a String column name of a column in the df where the true class is stored
* @param pred a String column name of a column in the df where the cluster id is stored
*/
def homogeneity_score(df: DataFrame, truth: String, pred: String): Double = {
import org.apache.spark.sql.functions.{sum, udf, lit, count}
def log2 = (x: Double) => scala.math.log10(x)/scala.math.log10(2.0)
def entropy(count: Int, n: Long, n_k: Long): Double = {
-(count.toDouble / n) * log2(count.toDouble / n_k)
}
val udf_entropy = udf(entropy _)
val n = df.count().toLong
val classes = df.groupBy(truth).count()
val clusters = df.groupBy(pred).count().toDF(pred, "count_k")
// number of class c assigned to cluster k
val n_ck = df.groupBy(truth,pred).count()
val entropy_of_classes = (classes.withColumn("entropy",
udf_entropy(classes("count"),
lit(n),
lit(n)))
.agg(sum("entropy"))
.first()
.getDouble(0))
val joined_df = n_ck.as("n_ck").join(clusters, pred)
val conditional_entropy = (joined_df.withColumn("c_entropy",
udf_entropy(joined_df("count"),
lit(n),
joined_df("count_k")))
.agg(sum("c_entropy"))
.first()
.getDouble(0))
1 - conditional_entropy.toDouble / entropy_of_classes
}
/** Calculates the completeness custer scoring
*
* @param df a Spark DataFrame of the input data
* @param truth a String column name of a column in the df where the true class is stored
* @param pred a String column name of a column in the df where the cluster id is stored
* @param two_plus Boolean to state whether or not to filter for only two or greater count in the truth values
*/
def completeness_score(df: DataFrame, truth: String, pred: String, two_plus: Boolean = false): Double = {
import org.apache.spark.sql.functions.{sum, udf, lit, count}
var filtered_df = df
if (two_plus){
println("filtering for 2+")
filtered_df = (df.groupBy(truth)
.agg(count(lit(1)).alias("count"))
.as("df1")
.join(df.as("df2"), truth)
.filter("count > 1").select(truth, pred))
}
homogeneity_score(filtered_df, pred, truth)
}
/** Calculates the harmonic mean / v measurement of the custer scoring
*
* @param df a Spark DataFrame of the input data
* @param truth a String column name of a column in the df where the true class is stored
* @param pred a String column name of a column in the df where the cluster id is stored
*/
def v_measurement_score(df: DataFrame, truth: String, pred: String): Double = {
val h = homogeneity_score(df, truth, pred)
val c = completeness_score(df, truth, pred)
2 * h * c / (h + c)
}
In [16]:
import org.apache.spark.sql.Row
// Dataset 1
case class jz_row(truth: String, pred: String)
val table = Seq(jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "b"),jz_row("1", "b"),jz_row("1", "c"),jz_row("1", "c"),jz_row("2","d"))
var df = spark.createDataFrame(table)
In [17]:
// Dataset 2
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val labels_pred = List("0,1,1,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val rows = labels_true zip labels_pred
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)
In [29]:
// Dataset 3
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5,6,7,8").flatMap(_.split(","))
val labels_pred = List("0,1,1,1,1,1,3,3,3,5,5,5,5,5,5,5,5,7,8,9").flatMap(_.split(","))
val rows = labels_true zip labels_pred
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)
In [42]:
df.show()
In [43]:
homogeneity_score(df, "truth", "pred")
Out[43]:
In [44]:
completeness_score(df, "truth", "pred")
Out[44]:
In [45]:
completeness_score(df, "truth", "pred", true)
Out[45]:
In [46]:
v_measurement_score(df, "truth", "pred")
Out[46]:
In [ ]: